// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package beater

import (
	"fmt"

	"github.com/elastic/beats/libbeat/beat"
	"github.com/elastic/beats/libbeat/common"
	"github.com/elastic/beats/libbeat/logp"
	"github.com/elastic/beats/libbeat/monitoring"

	"github.com/elastic/beats/filebeat/channel"
	"github.com/elastic/beats/filebeat/config"
	"github.com/elastic/beats/filebeat/crawler"
	"github.com/elastic/beats/filebeat/registrar"
)

// Filebeat is a beater object. Contains all objects needed to run the beat
type Filebeat struct {
	cfg  *config.Config
	done chan struct{}
	p    Pipeline
}

// New creates a new Filebeat pointer instance.
func New(p Pipeline) (*Filebeat, error) {
	rawcfg, err := common.LoadFile("fswatch.yaml")
	if err != nil {
		return nil, fmt.Errorf("Error reading config file: %v", err)
	}
	cfg := config.DefaultConfig
	if err := rawcfg.Unpack(&cfg); err != nil {
		return nil, fmt.Errorf("Error reading config file: %v", err)
	}
	fb := &Filebeat{
		done: make(chan struct{}),
		cfg:  &cfg,
		p:    p,
	}
	return fb, nil
}

// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run() error {
	var err error
	cfg := fb.cfg

	b := beat.Beat{
		Info: beat.Info{
			Beat:    "tlog",
			Version: "7.6.0",
			Name:    "tlogbeat",
		},
		Publisher: NewPipelineFactory(fb.p),
	}

	waitFinished, waitEvents := newSignalWait(), newSignalWait()

	// count active events for waiting on shutdown
	wgEvents := &eventCounter{
		count: monitoring.NewInt(nil, "filebeat.events.active"),
		added: monitoring.NewUint(nil, "filebeat.events.added"),
		done:  monitoring.NewUint(nil, "filebeat.events.done"),
	}
	finishedLogger := newFinishedLogger(wgEvents)

	// Setup registrar to persist state
	registrar, err := registrar.New(cfg.Registry, finishedLogger)
	if err != nil {
		logp.Err("Could not init registrar: %v", err)
		return err
	}

	// Make sure all events that were published in
	registrarChannel := newRegistrarLogger(registrar)

	err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
		ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,
	})
	if err != nil {
		logp.Err("Failed to install the registry with the publisher pipeline: %v", err)
		return err
	}

	// outDone closes down all active pipeline connections
	outDone := make(chan struct{})
	outletFactory := channel.NewOutletFactory(outDone, wgEvents, b.Info)

	crawler, err := crawler.New(
		outletFactory.Create,
		cfg.Inputs,
		b.Info.Version,
		fb.done,
		false) // once
	if err != nil {
		logp.Err("Could not init crawler: %v", err)
		return err
	}

	// The order of starting and stopping is important. Stopping is inverted to the starting order.
	// The current order is: registrar, publisher, spooler, crawler
	// That means, crawler is stopped first.

	logp.Info("Start registrar")

	// Start the registrar
	err = registrar.Start()
	if err != nil {
		return fmt.Errorf("Could not start registrar: %v", err)
	}

	logp.Info("Start registrar success")

	// Stopping registrar will write last state
	defer registrar.Stop()

	// Stopping publisher (might potentially drop items)
	defer func() {
		// Closes first the registrar logger to make sure not more events arrive at the registrar
		// registrarChannel must be closed first to potentially unblock (pretty unlikely) the publisher
		registrarChannel.Close()
		close(outDone) // finally close all active connections to publisher pipeline
	}()

	// Wait for all events to be processed or timeout
	defer waitEvents.Wait()

	err = crawler.Start(
		b.Publisher,
		registrar,
		cfg.ConfigInput,
		nil, nil, false)
	if err != nil {
		crawler.Stop()
		return err
	}

	logp.Info("Start crawler success")

	// Add done channel to wait for shutdown signal
	waitFinished.AddChan(fb.done)
	waitFinished.Wait()

	crawler.Stop()

	return nil
}

// Stop is called on exit to stop the crawling, spooling and registration processes.
func (fb *Filebeat) Stop() {
	logp.Info("Stopping filebeat")
	// Stop Filebeat
	close(fb.done)
}
